package com.cloudansys.core.flink.function;

import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.*;

@Slf4j
public class TenSecMeanApplyFunc implements WindowFunction<List<MultiDataEntity>, List<MultiDataEntity>, String, TimeWindow> {

    /**
     * 使用【第6个指标】进行前十秒均值计算，计算所得的数值设置成应变类型的【第5个指标】
     * 对窗口内的元素进行处理：
     * 1、每次窗口输出的元素为当前时间的元素，只是把传感器的第五个指标值设成了前十秒的平均值
     */
    @Override
    public void apply(String k, TimeWindow timeWindow, Iterable<List<MultiDataEntity>> elements, Collector<List<MultiDataEntity>> out) throws Exception {
        // 因为，窗口有重叠，一个流元素会同时属于多个窗口，这里把其它窗口对相同元素的操作给去除掉
        for (List<MultiDataEntity> element : elements) {
            for (MultiDataEntity multiDataEntity : element) {
                multiDataEntity.setCount(1);
            }
        }
        // eleMap 的 Key 为：projectId + serialCode
        Map<String, MultiDataEntity> eleMap = new HashMap<>();
        for (List<MultiDataEntity> multiDataEntities : elements) {
            for (MultiDataEntity multiDataEntity : multiDataEntities) {
                String projectId = multiDataEntity.getProjectId();
                String serialCode = multiDataEntity.getSerialCode();
                String pickTime = multiDataEntity.getPickTime();
                Double[] values = multiDataEntity.getValues();
                String key = projectId + serialCode;
                if (eleMap.containsKey(key)) {
                    MultiDataEntity existEntity = eleMap.get(key);
                    String pickTime1 = existEntity.getPickTime();
                    Double[] values1 = existEntity.getValues();
                    // 第五个备用值，先存储前十秒的数据之和，后面再计算均值
                    values[4] = values1[4] + values[5];
                    // 同时更新数值相加的数量
                    int meanLevelCount = existEntity.getCount() + multiDataEntity.getCount();
                    existEntity.setCount(meanLevelCount);
                    existEntity.setValues(values);
                    if (getTimestamp(pickTime) > getTimestamp(pickTime1)) {
                        existEntity.setPickTime(pickTime);
                    }
                } else {
                    // 第五个备用值，先存储前十秒的数据，后面再计算均值
                    values[4] = values[5];
                    multiDataEntity.setValues(values);
                    eleMap.put(key, multiDataEntity);
                }
            }
        }
        // 计算第五个指标的均值
        Collection<MultiDataEntity> entities = eleMap.values();
        for (MultiDataEntity multiDataEntity : entities) {
            Double[] values = multiDataEntity.getValues();
            values[4] = Double.parseDouble(String.format(Const.FORMAT_DOUBLE, values[4] / multiDataEntity.getCount()));
            // 计算后把 count 重置为 1，计算meanLevel分页使用
            multiDataEntity.setCount(1);
            multiDataEntity.setValues(values);
        }
        out.collect(new ArrayList<>(entities));
    }

    /**
     * @param time yyyyMMddHHmmss 格式的时间
     * @return 返回 long 型时间戳
     */
    private long getTimestamp(String time) {
        DateTimeFormatter dtf = DateTimeFormat.forPattern(Const.FMT_TRIM_MILLI);
        return dtf.parseDateTime(time).toDate().getTime();
    }

}
